#!/usr/bin/perl

use strict;
use warnings;
use File::Spec;
use File::Path qw(mkpath);
use Data::Dumper;
use Encode;
use File::Copy;
use JSON;
use Try::Tiny;
use Net::SFTP::Foreign;
use Fcntl ':mode';
###扫描指定的目录当目录下有文件则启动进程，如无文件且非当日目录则删除
#



my (%config,%logger,%platcfg,%hashtmp,%dbcfg,%tabcfg,%etl,%ftp);
$config{dirs}{path_curf} = File::Spec->rel2abs(__FILE__);
($config{dirs}{scrvol},$config{dirs}{scrdir},$config{dirs}{file_script}) = File::Spec->splitpath($config{dirs}{path_curf});
$config{dirs}{root} = $config{dirs}{scrvol}.$config{dirs}{scrdir};

$config{dirs}{conf} = $config{dirs}{root}."config/";
$config{dirs}{logger} = $config{dirs}{root}."logs/";
$config{dirs}{tmp} = $config{dirs}{root}."files/";
$config{dirs}{class} = $config{dirs}{root}."class/";
#$config{dirs}{scan} = $config{dirs}{tmp}."scanroot/";
#初始化扫描文件目录，后期下面会读配置文件
$config{dirs}{scan} = "/home/xiqing/apps/Projects/Market/file/recv/";
#$config{dirs}{scan} = "/home/xiqing/apps/Projects/Market/file/sendcache/";
$config{files}{lockfile} = $config{dirs}{logger}."run.lock";
$config{files}{exitfile} = $config{dirs}{logger}."run.exit";


try {
	if (-e $config{files}{lockfile}){
		print "The Process is running.. \r\n";
		exit;
	}	
	open(FILE_LOCK, ">$config{files}{lockfile}");
	close(FILE_LOCK);	
		
	#导入类库	
	opendir(DIR, "$config{dirs}{class}") || die "Cannot open dir : $config{dirs}{class} ";
	foreach my $class_file (readdir(DIR)) {
		if(-f "$config{dirs}{class}/$class_file") {
			require($config{dirs}{class}.$class_file);
		}
	}
	closedir(DIR);
	
		
Process_Start:
	
	if (-e $config{files}{exitfile}){
		goto Process_End;
	}
	
	# ------------------------------
	# &nowsrl()
	# 当前流水
	# ------------------------------
	sub nowsrl {
	         my ( $s,$min,$h,$d,$m,$y ) = (localtime(time()))[0,1,2,3,4,5];
	#	 my ( $s,$min,$h,$d,$m,$y,$sss ) = (localtime(time()))[0,1,2,3,4,5,6];
	         $y += 1900;
	         $m ++;
	         return sprintf("%4d%02d%02d%02d%02d%02d%02d",$y,$m,$d,$h,$min,$s,$s*1000);
	}
	
	$logger{date}{year} = substr(&nowsrl,0,4);
	$logger{date}{month} = substr(&nowsrl,0,6);
	$logger{date}{date} = substr(&nowsrl,0,8);
	$logger{init}{dirs} = $config{dirs}{logger}.$logger{date}{year}.'/'.$logger{date}{month}.'/';
	$logger{init}{logfile} = $logger{init}{dirs}.'init_'.$logger{date}{date}.'.log';
		
	if (-e $logger{init}{dirs}) {
	} else {
		mkpath($logger{init}{dirs},0,0775);
	}
		
	&printlog($logger{init}{logfile},&nowtime." Process Starting \r\n");
	
	$config{files}{etl} = $config{dirs}{conf}.'etl.config';
	open(FILE,"<",$config{files}{etl})||die"cannot open the file: $! \r\n";
	while (my $content = <FILE>){
		chomp $content; 
		if (length($content) == 0 || substr($content,0,1) eq "#") {
			next;
		} else {
			my ($name,$value) = split(/\|/,$content);
			$etl{$name} = from_json($value);
		}
	}
	close FILE;
	
	$config{files}{dbconfig} = $config{dirs}{conf}.'db.config';
	open(FILE,"<",$config{files}{dbconfig})||die"cannot open the file: $! \r\n";
	while (my $content = <FILE>){
    	chomp $content; 
		if (length($content) == 0 || substr($content,0,1) eq "#") {
			next;
		} else {
			my ($name,$value) = split(/\|/,$content);
			$dbcfg{$name} = from_json($value);
		}
	}
	close FILE;
	
	$config{files}{ftp} = $config{dirs}{conf}.'ftp.config';
	open(FILE,"<",$config{files}{ftp})||die"cannot open the file: $! \r\n";
	while (my $content = <FILE>){
		chomp $content; 
		if (length($content) == 0 || substr($content,0,1) eq "#") {
			next;
		} else {
			my ($name,$value) = split(/\|/,$content);
			$ftp{$name} = from_json($value);
		}
	}
	close FILE;
	
	$config{files}{table} = $config{dirs}{conf}.'table.config';
	open(FILE,"<",$config{files}{table})||die"cannot open the file: $! \r\n";
	while (my $content = <FILE>){
		chomp $content; 
		if (length($content) == 0 || substr($content,0,1) eq "#") {
			next;
		} else {
			my ($name,$value) = split(/\|/,$content);
			$tabcfg{$name} = from_json($value);
		}
	}
	close FILE;
	
#	$config{files}{config} = $config{dirs}{conf}.'platid.config';
#	open(FILE,"<",$config{files}{config}) || die "cannot open the file: $config{files}{config} \n";
#	while (my $content = <FILE>) {
#		chomp $content; 
#		if ($content) {
#			if (substr($content,0,1) eq "#") {
#				next;
#			} else {
#				my %hashtmp;
#				($hashtmp{platid},$hashtmp{platname},$hashtmp{dirs},$hashtmp{runscript},$hashtmp{runlock}) = split(/\|/,$content);
#				$platcfg{$hashtmp{platid}} = \%hashtmp;
#			}
#		}
#	}
#	close FILE;
	#print Dumper(\%platcfg);
	$config{dirs}{datafile} = $config{dirs}{tmp}.'datafile/';
	$config{dirs}{tabletmp} = $config{dirs}{tmp}.'table/';
	&printlog($logger{init}{logfile},&nowtime." init successful \r\n");
	&printlog($logger{init}{logfile},&nowtime." Starting ETL ... \r\n");
	
	#循环ETL任务列表
	foreach my $key ( keys %etl ) {
		my $subhash = $etl{$key};
		my %subhash = %$subhash;
		my %info;
		$subhash{reloaddir} = $config{dirs}{tmp}.'reload/';
		&printlog($logger{init}{logfile},&nowtime." etlname: $key ");
		if ($subhash{isvalid} eq "1") {
			#etl任务有效
			&printlog($logger{init}{logfile}," ---> valid:deal,Connect Remote... \r\n");
			try { 
				if ($ftp{$subhash{ftp}}{type} eq "sftp") {
					#创建本地文件夹
					#$dircfg{hisdirtmp} = $dircfg{hisdir}.substr(&nowsrl,0,4).'/'.substr(&nowsrl,0,6).'/'.substr(&nowsrl,0,8).'/';
					$info{localdir} = $config{dirs}{datafile}.substr(&nowsrl,0,4).'/'.substr(&nowsrl,0,6).'/'.substr(&nowsrl,0,8).'/'.$subhash{localdir};
					if (-e $info{localdir}) {
					} else {
						mkpath($info{localdir},0,0775);
					}
					my $sftp = Net::SFTP::Foreign->new($ftp{$subhash{ftp}}{server}, 
											  user => $ftp{$subhash{ftp}}{account},
											  password => $ftp{$subhash{ftp}}{password},
											  port => $ftp{$subhash{ftp}}{port},
											  timeout => 300,
											  autodie => 1);
					#$sftp->die_on_error("unable to connect to remote host");
					&printlog($logger{init}{logfile},&nowtime." search: $subhash{remotedir} \r\n");
					$sftp->setcwd($subhash{remotedir});
					$info{ls} = $sftp->ls("./" ,wanted => sub {
										my $entry = $_[1];
										#print Dumper($_[1]);
										my %fileinfo;
										$fileinfo{Rfile} = $_[1]->{filename};
										$fileinfo{atime} = $entry->{a}->{mtime};
										$fileinfo{ss} = time() - $fileinfo{atime};
										if ($subhash{filekey}) {
											my @fileana = split(/_/,$fileinfo{Rfile});
											$fileinfo{filekey} = $fileana[$subhash{keypos}];
										}
										#print Dumper($_[1])."\r\n";
										if ($fileinfo{ss} > $subhash{scanagin} ) {
											if ($subhash{filekey}) {
				                            	if ($fileinfo{filekey} eq $subhash{filekey}) {
				                            		S_ISREG($entry->{a}->perm)
				                            	}
											} else {
												S_ISREG($entry->{a}->perm)
											}
										}
								#print Dumper(\%fileinfo);
	                     		});
	                &printlog($logger{init}{logfile},&nowtime." find ".@{$info{ls}}." files ...\r\n");     		
					foreach my $key ( @{$info{ls}} ) {
						my %subkey = %{$key};
						push @{$info{filelist}} , $subkey{filename};
						push @{$info{filename}} , $subhash{remotedir}.$subkey{filename};
						#print Dumper($subhash{filename})."\r\n";
					}
					
					if ($info{filename} && @{$info{filename}} > 0) {
						&printlog($logger{init}{logfile},&nowtime." Download ".@{$info{filename}}." files ...\r\n");
						try {
							$sftp->mget($info{filename},"$info{localdir}");
						} catch {
							&printlog($logger{init}{logfile},&nowtime." --->error:$_ \r\n");
						};
						&printlog($logger{init}{logfile},&nowtime." ---> Done \r\n");
					}
					
					#check download succ?
					if ($info{filelist} && @{$info{filelist}} > 0) {
						foreach my $key ( @{$info{filelist}} ) {
							my $localfile = $info{localdir}.$key;
							my $remotefile = $subhash{remotedir}.$key;
							my $errors;
							
							&printlog($logger{init}{logfile},&nowtime." check localfile: $localfile ");
							if (-e $localfile){
								#goto Process_End;
								#组成需要删除的文件列表
								#存在的元素重新组合，并删除远程文件,删除文件失败的，则也将本地文件删除，不入库
								&printlog($logger{init}{logfile}," ---> sucess \r\n");
								if ($remotefile) {
									$errors = 0;
									&printlog($logger{init}{logfile},&nowtime." delete remote file. $remotefile ..");
									$sftp->rremove($remotefile, on_error => sub { $errors++});
									if ($errors ==0 ) {
										&printlog($logger{init}{logfile}," --->sucess \r\n");
										push @{$info{listlocal}} , $localfile;
										#定义合并文件
										if (! $info{imp}{moveto}) {
											$info{imp}{filehandle} = "$subhash{name}"."_".&nowsrl;  #文件handle，用于sqlldr命名
											$info{imp}{filename} = $info{imp}{filehandle}.'.txt';#文件名
											$info{imp}{moveto} = $config{dirs}{tabletmp}.$info{imp}{filename};#完整路径文件
											&printlog($logger{init}{logfile},&nowtime." Create Import File: $info{imp}{moveto} \r\n");
										}
										&printlog($logger{init}{logfile},&nowtime." Append File $key --->>> $info{imp}{moveto}");
										# 合并文件
										open(FILE_DATA,"<",$localfile) || die "cannot open the file: $localfile \n";
										open(FILE_MOVE,">>$info{imp}{moveto}");
										while (my $content = <FILE_DATA>){
											#$content;
											#chomp $content; 
											#$content =~ s/\s//g;
											$content =~ s/[\r\n]//g;
											if ($tabcfg{$subhash{table}}{addcolumn}) {
												$content = &addcolumn(substr($content,0,10)).$content.'$$'.$info{imp}{filename}."\n";
											} else {
												$content = $content.'$$'.$info{imp}{filename}."\n";
											}
											print FILE_MOVE $content ;# 写入文件，主程序须要open FILE_LOG文件句柄
										}
										close FILE_DATA;
										close FILE_MOVE;
										&printlog($logger{init}{logfile}," --->>> Succ \r\n");
									} else {
										&printlog($logger{init}{logfile}," --->failed,delete local file");
										unlink($localfile);
										&printlog($logger{init}{logfile}," --->done \r\n");
									}
								}
							} else {
								&printlog($logger{init}{logfile}," failed \r\n");
								#从数据中移出
							}
						}
					}
					#print Dumper(\%info)."\r\n";	
					$sftp->disconnect;
												  
											  
				} elsif ($ftp{$subhash{ftp}}{type} eq "ftp") {
					
				}
			} catch {
				&printlog($logger{init}{logfile},&nowtime." $_ \r\n");
			};
				#import to database
			if ($info{imp}{moveto}) {
				try {
					my $loadres=1;
					my $loadtmp=0;
						foreach my $element (@{$subhash{db}})
						{
							if ($dbcfg{$element}{oratype} eq "oracle") {
								$loadtmp = func_sqlldr($info{imp}{filehandle}, $tabcfg{$subhash{table}}, $dbcfg{$element},$logger{init}{logfile}, $config{dirs}{tabletmp}, \%subhash);
							} elsif ($dbcfg{$element}{oratype} eq "mysql") {
								$loadtmp = func_mysqlldr($info{imp}{filehandle}, $tabcfg{$subhash{table}}, $dbcfg{$element},$logger{init}{logfile}, $config{dirs}{tabletmp}, \%subhash);
							}
							if ($loadtmp) {
								next;
							} else {
								$loadres = 0;
							}
						}
					if ($loadres) {
						#成功导入删除临时文件
						&printlog($logger{init}{logfile}, &nowtime." Load All Successful and Remove temp file ");
						my $lsfile = $config{dirs}{tabletmp}.$info{imp}{filehandle}.'.txt';
						my $lsctl = $lsfile.'.ctl';
						my $lslog = $lsfile.'.log';
						unlink($lsfile);
						unlink($lsctl);
						unlink($lslog);
						&printlog($logger{init}{logfile}, " Succ !~ \r\n");
					} else {
						&printlog($logger{init}{logfile}, &nowtime." Load failed and not Remove temp file \r\n");
					}
				} catch {
					&printlog($logger{init}{logfile},&nowtime." import Error $info{imp}{moveto} ... $_ \r\n ");
				};
			}
		} else {
			&printlog($logger{init}{logfile}," ---> invalid:Skip \r\n");
		}
	}
	
	sub callback 
	{
		my($sftp, $data, $offset, $size) = @_;
    	print "Read $offset / $size bytes\r";
	}
	
	#&checkdir($config{dirs}{scan},$logger{init}{logfile},$config{dirs}{tabletmp},\%tabcfg,\%dbcfg);
	
#	&printlog($logger{init}{logfile},&nowtime." Scan Successful \n");
#	&printlog($logger{init}{logfile},&nowtime." Check Process \n");
#	foreach my $key ( keys %platcfg ) {
#		my $subhash = $platcfg{$key};
#		my %subhash = %$subhash;
#		#print Dumper(\%subhash);
#		if ($subhash{filecnt}) {
#			&printlog($logger{init}{logfile},&nowtime." $subhash{platname} --> $subhash{platid} --> $subhash{runscript}");
#			#判断进程是否启动(判断文件锁)
#			my $lockfile = $subhash{dirs}.$subhash{runlock};
#			if (-e $lockfile) {
#				&printlog($logger{init}{logfile}," runing,skip \n");
#				#next;
#			} else {
#				&printlog($logger{init}{logfile}," not run start... ");
#				my $runfile = $subhash{dirs}.$subhash{runscript};
#				if (-e $runfile) {
#					system("cd $subhash{dirs};nohup $runfile >/dev/null 2>&1 &");
#					&printlog($logger{init}{logfile}," Done \n");
#				} else {
#					&printlog($logger{init}{logfile}," Not Found The Script \n");
#				}
#			}
#			#防止启动过快，runtime还没生成
#			sleep 10;
#		}
#		
#	}
	#print Dumper(\%platcfg);
	&printlog($logger{init}{logfile},&nowtime." Check Process End,and waiting 15mins again \n");
	sleep 60*15;
	goto Process_Start;
	
Process_End:

	&printlog($logger{init}{logfile},&nowtime." Found Exit Ctrl..Exiting.\r\n");
	unlink($config{files}{lockfile});
	unlink($config{files}{exitfile});

} catch {
	print "Process Error $_ \r\n";
	#&printlog($logger{init}{logfile},&nowtime." $_ \r\n");
	
};


